package com.hanxiaozhang.delayqueue.no2;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundZSetOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.*;

import static java.util.concurrent.Executors.defaultThreadFactory;

/**
 * 〈一句话功能简述〉<br>
 * 〈Redis实现延迟队列〉
 *
 * @author hanxinghua
 * @create 2022/9/16
 * @since 1.0.0
 */
@Slf4j
@Component
public class RedisDelayQueue {

    private static final String DELAY_QUEUE_NAME = "test-delay-queue";

    @Autowired
    private StringRedisTemplate stringRedisTemplate;


    /**
     * 添加消息到ZSet中
     *
     * @param messageId
     * @param delayTime
     */
    public void push(long messageId, long delayTime) {
        long score = System.currentTimeMillis() + delayTime;
        stringRedisTemplate.boundZSetOps(DELAY_QUEUE_NAME).add(String.valueOf(messageId), score);
    }


    @PostConstruct
    public void loopEvent() {
        // 启用一个单线程池
        ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

        singleThreadPool.execute(() -> {
            BoundZSetOperations<String, String> operations = stringRedisTemplate.boundZSetOps(DELAY_QUEUE_NAME);
            // 不断获取ZSet中过期的消息
            while (true) {
                try {
                    long current = System.currentTimeMillis();
                    // 获取小于当前时间的消息
                    Set<String> messageIdSet = stringRedisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_NAME, 0, current, 0, 1);
                    // 当前没有过期的消息，休息一下
                    if (CollectionUtils.isEmpty(messageIdSet)) {
                        Thread.sleep(500);
                        continue;
                    }
                    Iterator<String> iterator = messageIdSet.iterator();
                    String messageId = iterator.next();
                    // 删除该messageId
                    if (operations.remove(messageId) > 0) {
                        // 处理具体业务逻辑
                        handleDelayMessage(messageId);
                    }
                } catch (Exception e) {
                    log.error("消息消费异常", e);
                }
            }
        });
    }


    /**
     * 具体业务逻辑
     *
     * @param messageId
     */
    private void handleDelayMessage(String messageId) {
    }


}
